{-# OPTIONS_GHC -fno-warn-deprecations #-}
#ifdef __HADDOCK_VERSION__
#undef INSPECTION
#endif

#ifdef INSPECTION
{-# LANGUAGE TemplateHaskell #-}
{-# OPTIONS_GHC -fplugin Test.Inspection.Plugin #-}
#endif

-- |
-- Module      : Streamly.Internal.Data.Stream.SVar.Generate
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.Stream.SVar.Generate
    {-# DEPRECATED "SVar is replaced by Channel." #-}
    (
    -- * Write to SVar
      toSVar

    -- * Read from SVar
    -- $concurrentEval
    , fromSVar
    , fromSVarD
    -- , fromStreamVar
    )
where

#include "inline.hs"

import Control.Exception (fromException)
import Control.Monad (when)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef)
import Data.Maybe (isNothing)
import Streamly.Internal.Control.Concurrent (MonadAsync, askRunInIO)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import System.Mem (performMajorGC)

import qualified Streamly.Internal.Data.Stream as D
    (Stream(..), Step(..))
import qualified Streamly.Internal.Data.StreamK as K
    (Stream, foldStreamShared, mkStream, foldStream)
import qualified Streamly.Internal.Data.Stream.Serial as Stream (fromStreamK)

import Streamly.Internal.Data.SVar

#ifdef INSPECTION
import Control.Exception (Exception)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Typeable (Typeable)
import Test.Inspection (inspect, hasNoTypeClassesExcept)
#endif


------------------------------------------------------------------------------
-- Generating streams from SVar
------------------------------------------------------------------------------

-- $concurrentEval
--
-- Usually the SVar is used to concurrently evaluate multiple actions in a
-- stream using many worker threads that push the results to the SVar and a
-- single puller that pulls them from SVar generating the evaluated stream.
--
-- @
--                  input stream
--                       |
--     <-----------------|<--------worker
--     |  exceptions     |
-- output stream <------SVar<------worker
--                       |
--                       |<--------worker
--
-- @
--
-- The puller itself schedules the worker threads based on demand.
-- Exceptions are propagated from the worker threads to the puller.

-------------------------------------------------------------------------------
-- Write a stream to an SVar
-------------------------------------------------------------------------------

-- XXX this errors out for Parallel/Ahead SVars
-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then
-- be read back from the SVar using 'fromSVar'.
toSVar :: MonadAsync m => SVar SerialT m a -> SerialT m a -> m ()
toSVar sv m = do
    runIn <- askRunInIO
    liftIO $ enqueue sv (runIn, m)
    done <- allThreadsDone sv
    -- XXX This is safe only when called from the consumer thread or when no
    -- consumer is present.  There may be a race if we are not running in the
    -- consumer thread.
    -- XXX do this only if the work queue is not empty. The work may have been
    -- carried out by existing workers.
    when done $
        case yieldRateInfo sv of
            Nothing -> pushWorker 0 sv
            Just _  -> pushWorker 1 sv

-------------------------------------------------------------------------------
-- Read a stream from an SVar
-------------------------------------------------------------------------------

-- | Pull a stream from an SVar.
{-# NOINLINE fromStreamVar #-}
fromStreamVar :: forall m a. MonadAsync m => SVar K.Stream m a -> K.Stream m a
fromStreamVar sv = K.mkStream $ \st yld sng stp -> do
    list <- readOutputQ sv
    -- Reversing the output is important to guarantee that we process the
    -- outputs in the same order as they were generated by the constituent
    -- streams.
    K.foldStream st yld sng stp $ processEvents $ reverse list

    where

    allDone :: forall r. m r -> m r
    allDone stp = do
        when (svarInspectMode sv) $ do
            t <- liftIO $ getTime Monotonic
            liftIO $ writeIORef (svarStopTime (svarStats sv)) (Just t)
            liftIO $ printSVar sv "SVar Done"
        stp

    {-# INLINE processEvents #-}
    processEvents [] = K.mkStream $ \st yld sng stp -> do
        done <- postProcess sv
        if done
        then allDone stp
        else K.foldStream st yld sng stp $ fromStreamVar sv

    processEvents (ev : es) = K.mkStream $ \st yld sng stp -> do
        let rest = processEvents es
        case ev of
            ChildYield a -> yld a rest
            ChildStop tid e -> do
                accountThread sv tid
                case e of
                    Nothing -> do
                        stop <- shouldStop tid
                        if stop
                        then liftIO (cleanupSVar sv) >> allDone stp
                        else K.foldStream st yld sng stp rest
                    Just ex ->
                        case fromException ex of
                            Just ThreadAbort ->
                                K.foldStream st yld sng stp rest
                            Nothing -> liftIO (cleanupSVar sv) >> throwM ex
    shouldStop tid =
        case svarStopStyle sv of
            StopNone -> return False
            StopAny -> return True
            StopBy -> do
                sid <- liftIO $ readIORef (svarStopBy sv)
                return $ tid == sid

#ifdef INSPECTION
#if __GLASGOW_HASKELL__ != 906
-- Use of GHC constraint tuple (GHC.Classes.(%,,%)) in fromStreamVar leads to
-- space leak because the tuple gets allocated in every recursive call and each
-- allocation holds on to the previous allocation. This test is to make sure
-- that we do not use the constraint tuple type class.
--
inspect $ hasNoTypeClassesExcept 'fromStreamVar
    [ ''Monad
    , ''Applicative
    , ''MonadThrow
    , ''Exception
    , ''MonadIO
    , ''MonadBaseControl
    , ''Typeable
    , ''Functor
    ]
#endif
#endif

-- | Generate a stream from an SVar.  An unevaluated stream can be pushed to an
-- SVar using 'toSVar'.  As we pull a stream from the SVar the input stream
-- gets evaluated concurrently. The evaluation depends on the SVar style and
-- the configuration parameters e.g. using the maxBuffer/maxThreads
-- combinators.
--
{-# INLINE fromSVar #-}
fromSVar :: MonadAsync m => SVar K.Stream m a -> SerialT m a
fromSVar sv =
    Stream.fromStreamK $ K.mkStream $ \st yld sng stp -> do
        ref <- liftIO $ newIORef ()
        _ <- liftIO $ mkWeakIORef ref hook
        -- We pass a copy of sv to fromStreamVar, so that we know that it has
        -- no other references, when that copy gets garbage collected "ref"
        -- will get garbage collected and our hook will be called.
        K.foldStreamShared st yld sng stp $
            fromStreamVar sv{svarRef = Just ref}
    where

    hook = do
        when (svarInspectMode sv) $ do
            r <- liftIO $ readIORef (svarStopTime (svarStats sv))
            when (isNothing r) $
                printSVar sv "SVar Garbage Collected"
        cleanupSVar sv
        -- If there are any SVars referenced by this SVar a GC will prompt
        -- them to be cleaned up quickly.
        when (svarInspectMode sv) performMajorGC

data FromSVarState t m a =
      FromSVarInit
    | FromSVarRead (SVar t m a)
    | FromSVarLoop (SVar t m a) [ChildEvent a]
    | FromSVarDone (SVar t m a)

-- | Like 'fromSVar' but generates a StreamD style stream instead of CPS.
--
{-# INLINE_NORMAL fromSVarD #-}
fromSVarD :: (MonadAsync m) => SVar t m a -> D.Stream m a
fromSVarD svar = D.Stream step FromSVarInit
    where

    {-# INLINE_LATE step #-}
    step _ FromSVarInit = do
        ref <- liftIO $ newIORef ()
        _ <- liftIO $ mkWeakIORef ref hook
        -- when this copy of svar gets garbage collected "ref" will get
        -- garbage collected and our GC hook will be called.
        let sv = svar{svarRef = Just ref}
        return $ D.Skip (FromSVarRead sv)

        where

        {-# NOINLINE hook #-}
        hook = do
            when (svarInspectMode svar) $ do
                r <- liftIO $ readIORef (svarStopTime (svarStats svar))
                when (isNothing r) $
                    printSVar svar "SVar Garbage Collected"
            cleanupSVar svar
            -- If there are any SVars referenced by this SVar a GC will prompt
            -- them to be cleaned up quickly.
            when (svarInspectMode svar) performMajorGC

    step _ (FromSVarRead sv) = do
        list <- readOutputQ sv
        -- Reversing the output is important to guarantee that we process the
        -- outputs in the same order as they were generated by the constituent
        -- streams.
        return $ D.Skip $ FromSVarLoop sv (Prelude.reverse list)

    step _ (FromSVarLoop sv []) = do
        done <- postProcess sv
        return $ D.Skip $ if done
                      then FromSVarDone sv
                      else FromSVarRead sv

    step _ (FromSVarLoop sv (ev : es)) = do
        case ev of
            ChildYield a -> return $ D.Yield a (FromSVarLoop sv es)
            ChildStop tid e -> do
                accountThread sv tid
                case e of
                    Nothing -> do
                        stop <- shouldStop tid
                        if stop
                        then do
                            liftIO (cleanupSVar sv)
                            return $ D.Skip (FromSVarDone sv)
                        else return $ D.Skip (FromSVarLoop sv es)
                    Just ex ->
                        case fromException ex of
                            Just ThreadAbort ->
                                return $ D.Skip (FromSVarLoop sv es)
                            Nothing -> liftIO (cleanupSVar sv) >> throwM ex
        where

        shouldStop tid =
            case svarStopStyle sv of
                StopNone -> return False
                StopAny -> return True
                StopBy -> do
                    sid <- liftIO $ readIORef (svarStopBy sv)
                    return $ tid == sid

    step _ (FromSVarDone sv) = do
        when (svarInspectMode sv) $ do
            t <- liftIO $ getTime Monotonic
            liftIO $ writeIORef (svarStopTime (svarStats sv)) (Just t)
            liftIO $ printSVar sv "SVar Done"
        return D.Stop
